package com.xiaofan

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis


object UvWithBloom {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val inputStream: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")

    val dataStream: DataStream[UserBehavior] = inputStream
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
        }
      }
      .assignAscendingTimestamps(_.timestamp * 1000L)

    val resultStream: DataStream[UvCount] = dataStream
      .filter(_.behavior == "pv")
      .map(data => ("uv", data.userId))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .trigger(new MyTrigger())
      .process(new UvCountWithBloom())

    resultStream.print()


    env.execute("uv job with bloom!")
  }
}

/**
 * 触发器，每来一条数据， 直接触发窗口计算，并清空窗口状态
 */
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {

  override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}


/**
 * 自定义一个布隆过滤器，主要是一个位图和 hash哈数
 */
class Bloom(size: Long) extends Serializable {

  private val cap = size


  def hash(value: String, seed: Int): Long = {
    var result = 0
    for (i <- 0 until value.length) {
      result = result * seed + value.charAt(i)
    }

    // 返回hash值，要映射到cap范围内
    (cap - 1) & result
  }
}


/**
 * 实现自定义窗口处理函数
 */
class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {
  // 定义redis链接，以及布隆过滤器
  lazy val jedis = new Jedis("192.168.157.11", 6379)

  lazy val bloomFilter = new Bloom(1 << 29) // 位的个数：2^6(64) * 2^20(1M) * 2^3(8bit)  64M

  // 本来是收集齐所有数据、窗口触发计算的时候才会调用：现在没来一条数据都调用一次
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
    // 先定义redis中存储位图的key
    val storedBitMapKey = context.window.getEnd.toString

    // 另外将当前窗口的uv count值，作为状态保存在redis中，用一个叫做uvcount的hash表来保存（windowEnd， count）
    val uvCountMap = "uvcount"
    val currentKey = context.window.getEnd.toString
    var count = 0L

    // 从redis中取出当前窗口的uv count值
    if (jedis.hget(uvCountMap, currentKey) != null) {
      count = jedis.hget(uvCountMap, currentKey).toLong
    }

    // 去重：判断当前userId的hash值对应的位图位置，是否为0
    val userId = elements.last._2.toString
    // 计算hash值，就对应着位图中的偏移量
    val offset = bloomFilter.hash(userId, 61)
    // 用redis的为操作命令，取bitmap中对应位的值
    val isExist = jedis.getbit(storedBitMapKey, offset)

    if (!isExist) {
      // 如果不存在， 那么位图对应位置1，并且将count值加1
      jedis.setbit(storedBitMapKey, offset, true)
      jedis.hset(uvCountMap, storedBitMapKey, (count + 1).toString)
    }
  }
}

